package AdsClick;

import bean.AdsClickLog;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author Spring_Hu
 * @date 2021/10/12 22:09
 */
public class AdsClick {
    public static void main(String[] args) throws Exception {
        // 各省份页面广告点击量实时统计
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.readTextFile("test/AdClickLog.csv")
                .map(value ->new AdsClickLog(Long.parseLong(value.split(",")[0]),
                        Long.parseLong(value.split(",")[1]),value.split(",")[2],
                        value.split(",")[3],Long.parseLong(value.split(",")[4])))
                .returns(Types.GENERIC(AdsClickLog.class))
                .process(new ProcessFunction<AdsClickLog, Tuple2<String,Long>>() {
                    @Override
                    public void processElement(AdsClickLog value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                        out.collect(Tuple2.of(value.getProvince()+"_"+value.getAdId(),1L));
                    }
                })
                .keyBy(value -> value.f0)
                .sum(1)
                .print();

        env.execute();
    }
}
